# Import modules
import numpy as np
from sklearn.decomposition import PCA
import glob
import matplotlib.pyplot as plt
import datetime
from tqdm import tqdm
from sklearn.preprocessing import MinMaxScaler
from scipy.cluster.hierarchy import dendrogram
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics import silhouette_score
# Defining some functions for clustering dendrograms and reconstruction of covariance matrices
def plot_dendrogram(model, **kwargs):
"""Create linkage matrix and then plot the dendrogram"""
# Create the counts of samples under each node
counts = np.zeros(model.children_.shape[0])
n_samples = len(model.labels_)
for i, merge in enumerate(model.children_):
current_count = 0
for child_idx in merge:
if child_idx < n_samples:
current_count += 1 # leaf node
else:
current_count += counts[child_idx - n_samples]
counts[i] = current_count
linkage_matrix = np.column_stack(
[model.children_, model.distances_, counts]
).astype(float)
# Plot the corresponding dendrogram
dendrogram(linkage_matrix, **kwargs)
def reconstruct_matrix(X, ind, inv_scale=True):
"""Reconstruct matrix from one dimensional (full) feature vector"""
first_covmat_recov = np.eye(first_covmat.shape[0])
if inv_scale:
X_ind = scaler.inverse_transform(X[ind])
first_covmat_recov[np.triu_indices(first_covmat.shape[0], k = 1)] = X_ind
first_covmat_recov.T[np.triu_indices(first_covmat.shape[0], k = 1)] = X_ind
else:
first_covmat_recov[np.triu_indices(first_covmat.shape[0], k = 1)] = X[ind,:]
first_covmat_recov.T[np.triu_indices(first_covmat.shape[0], k = 1)] = X[ind,:]
return first_covmat_recov
def reconstruct_matrix_pca(X_pca, ind, pca, ncomp_r=None, inv_scale=True):
"""Reconstruct matrix from reduced feature vector"""
first_features_recov = X_pca[ind,:ncomp_r] @ pca.components_[:ncomp_r,:]
if inv_scale:
first_features_recov = scaler.inverse_transform(first_features_recov)
first_covmat_recov = np.eye(first_covmat.shape[0])
first_covmat_recov[np.triu_indices(first_covmat.shape[0], k = 1)] = first_features_recov
first_covmat_recov.T[np.triu_indices(first_covmat.shape[0], k = 1)] = first_features_recov
return first_covmat_recov
def reconstruct_centroid(X_centroid, pca, ncomp_r=None, inv_scale=False):
first_features_recov = X_centroid[:ncomp_r] @ pca.components_[:ncomp_r,:]
if inv_scale:
first_features_recov = scaler.inverse_transform(first_features_recov)
first_covmat_recov = np.eye(first_covmat.shape[0])
first_covmat_recov[np.triu_indices(first_covmat.shape[0], k = 1)] = first_features_recov
first_covmat_recov.T[np.triu_indices(first_covmat.shape[0], k = 1)] = first_features_recov
return first_covmat_recov
directory = "out/covmats_band/"
files_all = sorted(glob.glob(directory + '*.npy'))
len(files_all)
744
# Load random covariance matrix and take the median over a specified frequencies band
fig, ax = plt.subplots(figsize=(8,8), dpi=200)
sCh, eCh = 0, -1
ind = 180
fb = 2
first_covmat = np.median(np.load(files_all[ind])[fb*10:(fb+1)*10,sCh:eCh,sCh:eCh],axis=0)
# Extract date and time information from filename
date, time, window_id_long = files_all[ind].split('/')[-1].split('_')
year, month, day = date.split('-')
hour, minute, second = time.split('-')
window_id = window_id_long.split('=')[-1].split('.')[0]
extra_time = datetime.timedelta(minutes=int(window_id)*20)
time_str = f'{day}/{month}/{year} {hour}:{minute}:{second}'
date_format_str = '%d/%m/%Y %H:%M:%S'
given_time = datetime.datetime.strptime(time_str, date_format_str)
# Plot covariance matrix
im = ax.matshow(first_covmat[:,:],vmax=0.3, interpolation='nearest')
cbar = plt.colorbar(im)
plt.xlabel("Channel number")
plt.ylabel("Channel number")
plt.title(f"Coherence matrix ({given_time + extra_time})")
plt.show()
# Only use elements above diagonal, since matrix is symmetric
first_uptri = np.triu(first_covmat)
first_features = first_uptri[np.triu_indices(first_covmat.shape[0], k = 1)].reshape(-1,1)
first_features.shape
(301476, 1)
from tqdm import tqdm
nMatrices = len(files_native)
nFeatures = (first_covmat.shape[0] * first_covmat.shape[1] - first_covmat.shape[0]) // 2
X0 = np.zeros((nMatrices, nFeatures))
X1 = np.zeros((nMatrices, nFeatures))
X2 = np.zeros((nMatrices, nFeatures))
for k in tqdm(range(nMatrices)):
full_covmat = np.load(files_native[k])[:,:]
test_covmat0 = np.median(full_covmat[:20,:,:],axis=0)
test_covmat1 = np.median(full_covmat[20:40,:,:],axis=0)
test_covmat2 = np.median(full_covmat[40:,:,:],axis=0)
test_uptri0 = np.triu(test_covmat0)
test_uptri1 = np.triu(test_covmat1)
test_uptri2 = np.triu(test_covmat2)
test_features0 = test_uptri0[np.triu_indices(first_covmat.shape[0], k = 1)]
test_features1 = test_uptri1[np.triu_indices(first_covmat.shape[0], k = 1)]
test_features2 = test_uptri2[np.triu_indices(first_covmat.shape[0], k = 1)]
X0[k,:] = test_features0
X1[k,:] = test_features1
X2[k,:] = test_features2
100%|██████████| 744/744 [37:40<00:00, 3.04s/it]
# Run PCA on the whole dataset and reduce to 100 eigenvectors / eigenvalues
# Clustering will be performed on the eigenvalues, while the eigenvectors give an idea of the characteristics
# of the data
n_comp = 100 # Reduce to 100 dimensions
pca = PCA(n_components=n_comp)
pca.fit(X1)
X_pca = pca.transform(X1)
X_pca.shape
(744, 100)
# Plotting the first 15 eigenvectors
# Eigenvector 0 emphasizes a noise source around channel 100. Eigevector 1 emphasizes a noise source around
# channel 600. Eigenvector 2 shows a noise source around channel number 200 and Eigenvector 4 has high
# contribution at the very first channels. In this example, higher Eigenvectors don't describe individual sources
# but give more detail to fully describe the original coherence matrices.
vmax_eig = 0.05
fig, axs = plt.subplots(nrows=3, ncols=5, figsize=(15, 9),
sharex=True, sharey=True, dpi=300)
for ax_id, ax in enumerate(axs.reshape(-1)):
first_eigvec = reconstruct_matrix(pca.components_, ind=ax_id, inv_scale=False)
im2 = ax.matshow((first_eigvec)[:,:], vmax=vmax_eig, vmin=-vmax_eig, cmap='RdGy')
ax.set_title(f"Eigenvector {ax_id}")
plt.tight_layout()
fig.subplots_adjust(right=0.93)
cbar_ax = fig.add_axes([0.94, 0.15, 0.02, 0.7])
fig.colorbar(im2, cax=cbar_ax)
plt.show()